package com.atguigu.gmall.realtime.app.dws;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.bean.VisitorStats;
import com.atguigu.gmall.realtime.utils.ClickHouseUtil;
import com.atguigu.gmall.realtime.utils.DateTimeUtil;
import com.atguigu.gmall.realtime.utils.MyKafka;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.Date;

//将分流后的数据，进行用户访客主题统计
public class VisitorStatsApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        //1.1 设计流环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
       //1.2 设计checkpoint
        //设置多少s开启一次检查点,精准一次消费
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        //设置重启策略,重启多少次，和每次连接的时间
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(5,5000L));
        //设置检查点保存位置
        env.setStateBackend(new FsStateBackend("hdfs://hadoop104:8020/gmall/gmall/flink/checkpoint"));
        //设置操作用户
        System.setProperty("HADOOP_USER_NAME","atguigu");
        //设置超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        //TODO 2 从kafka中读取数据
        //2.1 定义要消费的主题
        //2.1.1 page主题
        String topicPage="dwd_topic_page";
        String groupId="VisitorStatsApp";
        FlinkKafkaConsumer<String> PageSource = MyKafka.getFlinkKafkaConsumer(topicPage, groupId);
        DataStreamSource<String> pageDStream = env.addSource(PageSource);
        //2.1.2 dwm 的uv主题
        String uvTopic="dwm_unique_visit";
        FlinkKafkaConsumer<String> uvSource= MyKafka.getFlinkKafkaConsumer(uvTopic,groupId);
        DataStreamSource<String> uvDStream = env.addSource(uvSource);
        //2.1.3 dwm 的跳出dwm_user_jump_details主题
        String UserJumpTopic="dwm_user_jump_details";
        FlinkKafkaConsumer<String> UserJumpSource = MyKafka.getFlinkKafkaConsumer(UserJumpTopic, groupId);
        DataStreamSource<String> UserJumpDStream = env.addSource(UserJumpSource);
        //TODO 3 将读取的数据流进结构转换
        //3.1 将所以的数据流转换为访客实体类，再后面的可以被合成高表
        //3.2 页面表转换
        SingleOutputStreamOperator<VisitorStats> pageVisitorDStream = pageDStream.map(
                new MapFunction<String, VisitorStats>() {
                    @Override
                    public VisitorStats map(String jsonStr) throws Exception {
                        //先把数据转换为jsonObj对象类型，这样可以获取流的数据，然后可以将数据set到返回的实体类中
                        JSONObject jsonObject = JSON.parseObject(jsonStr);
                        VisitorStats visitorStats = new VisitorStats(
                                //窗口开始时间
                                "",
                                //窗口结束时间
                                "",
                                //版本
                                jsonObject.getJSONObject("common").getString("vc"),
                                //渠道
                                jsonObject.getJSONObject("common").getString("ch"),
                                //地区
                                jsonObject.getJSONObject("common").getString("ar"),
                                //新旧用户信息
                                jsonObject.getJSONObject("common").getString("is_new"),
                                0L,
                                1L,
                                0L,
                                0L,
                                jsonObject.getJSONObject("page").getLong("during_time"),
                                jsonObject.getLong("ts")
                        );
                        //会话次数,计算,判断当前会话次数,若是没有last_pageId,就是一次会话
                        String lastPageId = jsonObject.getJSONObject("common").getString("last_page_Id");
                        if (lastPageId == null || lastPageId.length() == 0) {
                            visitorStats.setPv_ct(1L);
                        }
                        return visitorStats;
                    }
                }
        );
        //3.3 独立访客表，结构转换
        SingleOutputStreamOperator<VisitorStats> uniqueVisitorDStream = uvDStream.map(
                new MapFunction<String, VisitorStats>() {
                    @Override
                    public VisitorStats map(String jsonStr) throws Exception {
                        JSONObject jsonObject = JSONObject.parseObject(jsonStr);
                        VisitorStats visitorStats = new VisitorStats(
                                "",
                                "",
                                jsonObject.getJSONObject("common").getString("vc"),
                                //渠道
                                jsonObject.getJSONObject("common").getString("ch"),
                                jsonObject.getJSONObject("common").getString("ar"),
                                jsonObject.getJSONObject("common").getString("is_new"),
                                1L,
                                0L,
                                0L,
                                0L,
                               0L,
                                jsonObject.getLong("ts")
                        );
                        return visitorStats;
                    }
                }
        );
        //3.4 用户跳出流的结构转换
        SingleOutputStreamOperator<VisitorStats> UserJumpDetailsDStrem = UserJumpDStream.map(
                new MapFunction<String, VisitorStats>() {
                    @Override
                    public VisitorStats map(String jsonStr) throws Exception {
                        //先将数据转换为jsonObj对象
                        JSONObject jsonObject = JSONObject.parseObject(jsonStr);
                        VisitorStats visitorStats = new VisitorStats(
                                "",
                                "",
                                jsonObject.getJSONObject("common").getString("vc"),
                                jsonObject.getJSONObject("common").getString("ch"),
                                jsonObject.getJSONObject("common").getString("ar"),
                                jsonObject.getJSONObject("common").getString("is_new"),
                                0L,
                                0L,
                                0L,
                                1L,
                                0L,
                                jsonObject.getLong("ts")

                        );

                        return visitorStats;
                    }
                }
        );
        //查看是否读取成功数据，并且类型是否转换成功
        //pageVisitorDStream.print("页面page转换>>>>>");
        //uniqueVisitorDStream.print("独立访客流转换>>>>");
        //UserJumpDetailsDStrem.print("用户跳出转换>>>>>");
        //TODO 4 将不同的数据流合并成一条流
        DataStream<VisitorStats> unionDStream = pageVisitorDStream.union(
                uniqueVisitorDStream,
                UserJumpDetailsDStrem
        );

        //TODO 5 设置水位线
        SingleOutputStreamOperator<VisitorStats> visitorWithWaterMarkDStream = unionDStream.assignTimestampsAndWatermarks(
                WatermarkStrategy.<VisitorStats>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                        .withTimestampAssigner(new SerializableTimestampAssigner<VisitorStats>() {
                            @Override
                            public long extractTimestamp(VisitorStats visitorStats, long recordTimestamp) {
                                return visitorStats.getTs();
                            }
                        })
        );
        //TODO 6 按照为维度进行分组,key 为Tuple4 <版本，渠道，地区，新老访客>
        KeyedStream<VisitorStats, Tuple4<String, String, String, String>> visitorKeyedDStream = visitorWithWaterMarkDStream.keyBy(
                new KeySelector<VisitorStats, Tuple4<String, String, String, String>>() {

                    @Override
                    public Tuple4<String, String, String, String> getKey(VisitorStats visitorStats) throws Exception {
                        return Tuple4.of(visitorStats.getVc(), visitorStats.getCh(), visitorStats.getAr(), visitorStats.getIs_new());
                    }
                }
        );
        //TODO 7 开窗
        WindowedStream<VisitorStats, Tuple4<String, String, String, String>, TimeWindow> windowDStream = visitorKeyedDStream.window(TumblingEventTimeWindows.of(Time.seconds(10)));
        //TODO 8 数据统计
        SingleOutputStreamOperator<VisitorStats> reduceDStream = windowDStream.reduce(
                new ReduceFunction<VisitorStats>() {
                    @Override
                    public VisitorStats reduce(VisitorStats value1, VisitorStats value2) throws Exception {
                        value1.setPv_ct(value1.getPv_ct() + value2.getPv_ct());
                        value1.setSv_ct(value1.getSv_ct() + value2.getSv_ct());
                        value1.setUv_ct(value1.getUv_ct() + value2.getUv_ct());
                        value1.setUj_ct(value1.getUj_ct() + value2.getUj_ct());
                        value1.setDur_sum(value1.getDur_sum() + value2.getDur_sum());
                        return value1;
                    }
                },
                new ProcessWindowFunction<VisitorStats, VisitorStats, Tuple4<String, String, String, String>, TimeWindow>() {
                    @Override
                    public void process(Tuple4<String, String, String, String> stringStringStringStringTuple4, Context context, Iterable<VisitorStats> elements, Collector<VisitorStats> out) throws Exception {
                        for(VisitorStats visitorStats:elements){
                            long start = context.window().getStart();
                            long end = context.window().getEnd();
                            visitorStats.setStt(DateTimeUtil.toYMDhms(new Date(start)));
                            visitorStats.setEdt(DateTimeUtil.toYMDhms(new Date(end)));
                            //向下游传递窗口中的元素
                            out.collect(visitorStats);
                        }
                    }
                }
        );
        //打印测试
        reduceDStream.print(">>>");
        //TODO 9 将聚合统计后的数据写入Clickhouse
        String sql="insert into visitor_stats_1021 values(?,?,?,?,?,?,?,?,?,?,?,?)";

        reduceDStream.addSink(
                ClickHouseUtil.getJdbcSink(sql)
        );


        env.execute();
    }

}
